查看原文
其他

0004-01-03 Livy REST 提交Spark作业

01

Livy简介

我们平时提交的spark任务,通常是使用Apache Spark本身提供的spark-submit、spark-shell和Thrift Server外,Apache Livy提供了另外一种与Spark集群交互的方式,通过REST接口

此外,Apache Livy支持同时维护多个会话,可以通过REST接口、Java/Scala库和Apache Zeppelin访问Apache Livy


02

Livy基本架构

Livy是一个典型的REST服务架构,一方面接收并解析用户的REST请求,转换成相应的操作;另一方面管理着用户所启动的所有Spark集群

用户可以以REST请求方式通过Livy启动一个会话(session),一个会话是由一个spark集群所构成的,并且通过RPC协议在Spark集群和Livy服务端之间进行通信。根据处理交互方式不同,Livy将会话分为两种类型:

1.交互式会话(interactive session):跟spark的交互处理相同,在启动会话后可以接收用户所提交的代码片段,提交至远程的spark集群编译并执行

2.批处理会话(batch session):用户可以通过livy以批处理的方式启动spark应用,这样的一个方式在Livy中称之为批处理会话,与spark中的批处理是相同的


03

Livy部署

进入https://livy.apache.org/download/下载Apache Livy(注意Github有Cloudera Livy),这是两个不同的Livy,注意区分,下载成功后执行unzip解压缩即可

1.下载压缩包并解压缩

  1. wget https://www.apache.org/dyn/closer.lua/incubator/livy/0.6.0-incubating/apache-livy-0.6.0-incubating-bin.zip


  2. unzip apache-livy-0.6.0-incubating-bin.zip


  3. mv apache-livy-0.6.0-incubating-bin /usr/local/livy

2.配置

  1. cd /usr/local/livy/conf

  2. cp livy-env.sh.template livy-env.sh

  3. vi livy-env.sh

  4. export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop/conf

  5. export SPARK_HOME=$SPARK_HOME


  6. cp livy.conf.template livy.conf

  7. vi livy.conf

  8. livy.spark.master=yarn-client #使用yarn模式

  9. livy.spark.deploy-mode = client

  10. livy.repl.enable-hive-context = true

  11. livy.server.session.kind=yarn

  12. livy.server.recovery.mode = recovery #恢复机制

  13. livy.server.recovery.state-store = zookeeper / filesystem #有两种保存机制,一种是文件,一种是zookeeper

  14. livy.server.recovery.state-store.url = zookeeper_host:port

  15. livy.file.upload.max.size=107059609600 # livy上传文件限制,默认上限是100M,当Jar包大于100M时候,会抛出异常Too large request or file

其他可选的配置有:

  1. livy.server.host #主机地址,默认为 0.0.0.0;

  2. livy.server.port #端口号,默认为 8998;

  3. livy.server.session.timeout-check # 是否检测会话超时,默认为 true;

  4. livy.server.session.timeout # 会话超时时间,默认为 1h;

  5. livy.server.session.state-retain.sec # 已完成会话保留时间,默认为 600s;

  6. livy.rsc.jars # RSC JAR 包位置,缓存在 HDFS 上,可以加速会话的启动速度;

  7. livy.repl.jars # REPL JAR 包位置,缓存在 HDFS 上,可以加速会话的启动速度

  8. livy.server.yarn.poll-interval # YARN 状态刷新频率,默认为 5s;

  9. livy.ui.enabled # 是否启动 UI 界面,默认为 true;

3.服务启动与停止

  1. cd /usr/local/livy/bin

  2. ./livy-server start # 启动服务

  3. ./livy-server stop # 停止服务

4.访问界面

  1. 访问页面http://bigdatatest-1:8999 # 注意:默认端口是8998,使用Ambari部署集群时的livy服务端口是8999


04

使用Postman测试工具了解Livy接口

Livy官文档:http://livy.apache.org/docs/latest/rest-api.html

1.新建session

  1. post http://bigdatatest-1:8999/sessions


2.查看session状态

  1. GET http://bigdatatest-1:8999/sessions


3.提交代码片段

  1. POST http://bigdatatest-1:8999/sessions/130/statements


  2. 注意:如果在同一个session,提交的代码片段是共享的,相当于在同一个spark shell里面进行操作的

  3. sesssions/130 :130这个数字是开始新建session(即第一步的结果)时得到的id值


4.查看代码运行结果

  1. GET http://bigdatatest-1:8999/sessions/0/statements/0


  2. 注意:statements/0 :后面的0是第3步提交代码片段返回的id值


5.杀掉session

  1. delete http://bigdatatest-1:8999/sessions/130


6.提交spark批处理任务

  1. POST http://bigdatatest-1:8999/batches


注意:示例中使用的SparkPi例子,该jar包路径在$SPARK_HOME/examples/jars/下

7.查看Spark任务结果

  1. GET http://bigdatatest-1:8999/batches


05

Livy Java API

使用Java API只需要以下几个步骤即可:

1.添加maven依赖

2.实现 org.apache.livy.Job 接口

3.基于URL地址构建LivyClient

4.上传依赖jar包 ---->注意如果jar包文件过大会抛出异常

5.提交Livy任务并获取结果

6.关闭客户端

1.添加mavan依赖

  1. <dependency>

  2. <groupId>org.apache.livy</groupId>

  3. <artifactId>livy-client-http</artifactId>

  4. <version>0.5.0-incubating</version>

  5. </dependency>

2.实现Job接口

  1. package com.ud.livy;


  2. import org.apache.livy.Job;

  3. import org.apache.livy.JobContext;

  4. import org.apache.spark.api.java.JavaPairRDD;

  5. import org.apache.spark.api.java.JavaRDD;

  6. import org.apache.spark.api.java.function.*;

  7. import org.slf4j.Logger;

  8. import org.slf4j.LoggerFactory;

  9. import scala.Tuple2;


  10. import java.util.ArrayList;

  11. import java.util.Arrays;

  12. import java.util.List;


  13. /**

  14. * Author:lcp

  15. * CreateTime:2019/5/31 10:50

  16. * Descriptor:

  17. */

  18. public class TestJob implements Job {

  19. private static Logger logger = LoggerFactory.getLogger(TestJob.class);


  20. @Override

  21. public List<String> call(JobContext jobContext) throws Exception {

  22. JavaRDD<String> stringJavaRDD = jobContext.sc().parallelize(Arrays.asList("hello", "are", "you", "spark", "java", "java", "spark", "hadoop"));

  23. logger.info("样本数据:" + stringJavaRDD.first());

  24. JavaPairRDD<String, Integer> stringIntegerJavaPairRDD = stringJavaRDD.mapToPair(new PairFunction<String, String, Integer>() {

  25. @Override

  26. public Tuple2<String, Integer> call(String s) throws Exception {

  27. return new Tuple2<>(s, 1);

  28. }

  29. }).reduceByKey(new Function2<Integer, Integer, Integer>() {

  30. @Override

  31. public Integer call(Integer integer, Integer integer2) throws Exception {

  32. return integer + integer2;

  33. }

  34. });

  35. List<Tuple2<String, Integer>> collect = stringIntegerJavaPairRDD.collect();

  36. List<String> list = new ArrayList<>();

  37. for (Tuple2<String,Integer> result:collect){

  38. list.add("key:"+ result._1+" value:"+result._2);

  39. }


  40. return list;

  41. }

  42. }

3.构建Client,上传Jar包,提交作业获取结果

  1. package com.ud.livy;


  2. import org.apache.livy.LivyClient;

  3. import org.apache.livy.LivyClientBuilder;

  4. import org.slf4j.Logger;

  5. import org.slf4j.LoggerFactory;

  6. import java.io.File;

  7. import java.net.URI;

  8. import java.util.List;


  9. /**

  10. * Author:lcp

  11. * CreateTime:2019/5/31 10:57

  12. * Descriptor:

  13. */

  14. public class LivyMain {

  15. private static Logger logger = LoggerFactory.getLogger(LivyMain.class);


  16. public static void main(String[] args) {

  17. LivyClient client = null;

  18. try {

  19. client = new LivyClientBuilder()

  20. .setURI(new URI("http://bigdatatest-1:8999"))

  21. .setConf("spark.debug.maxToStringFields", "10000000")

  22. .setConf("spark.sql.parquet.binaryAsString", "true")

  23. .setConf("spark.sql.parquet.writeLegacyForma", "true")

  24. .setConf("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

  25. .setConf("spark.kryoserializer.buffer", "256")

  26. .setConf("spark.yarn.queue", "llap")

  27. .setConf("spark.app.name", "sparkTestSpark")

  28. .setConf("spark.driver.memory", "2g")

  29. .setConf("spark.executor.memory", "5g")

  30. .setConf("spark.executor.instances", "3")

  31. .setConf("spark.executor.cores", "4")

  32. .setConf("spark.default.parallelism", "500")

  33. .build();


  34. //注意:使用uploadJar()方法将会在每次运行程序的时候上传jar,使用addJar可加快运行速度,一般需要先把jar包上传至hdfs

  35. client.addJar(new URI("hdfs://bigdatatest-1:8020/bigdata-backtrack-final-0.0.1-SNAPSHOT.jar"));

  36. // client.uploadJar(new File("D:\\GitCode\\livyTest\\target\\bigdata-backtrack-final-0.0.1-SNAPSHOT.jar")).get();

  37. List<String> results = ( List<String>) client.submit(new TestJob()).get();

  38. System.out.println(results);

  39. } catch (Exception e) {

  40. logger.error("抛出异常:",e);

  41. } finally {

  42. if(client!=null){

  43. client.stop(true);

  44. }

  45. }


  46. }

  47. }

4.结果展示


您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存